{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# flower REST API\n",
    "\n",
    "This document shows how to use the flower [REST API](https://github.com/mher/flower#api). \n",
    "\n",
    "We will use [requests](http://www.python-requests.org/en/latest/) for accessing the API. (See [here](http://www.python-requests.org/en/latest/user/install/) on how to install it.)    "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Code\n",
    "We'll use the following code throughout the documentation."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## tasks.py"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 43,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "from celery import Celery\n",
    "from time import sleep\n",
    "\n",
    "celery = Celery()\n",
    "celery.config_from_object({\n",
    "    'BROKER_URL': 'amqp://localhost',\n",
    "    'CELERY_RESULT_BACKEND': 'amqp://',\n",
    "    'CELERYD_POOL_RESTARTS': True,  # Required for /worker/pool/restart API\n",
    "})\n",
    "\n",
    "\n",
    "@celery.task\n",
    "def add(x, y):\n",
    "    return x + y\n",
    "\n",
    "\n",
    "@celery.task\n",
    "def sub(x, y):\n",
    "    sleep(30)  # Simulate work\n",
    "    return x -  y"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Running\n",
    "You'll need a celery worker instance and a flower instance running. In one terminal window run\n",
    "\n",
    "    celery worker --loglevel INFO -A proj -E --autoscale 10,3\n",
    "\n",
    "and in another terminal run\n",
    "\n",
    "    celery flower -A proj"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Tasks API\n",
    "The tasks API is *async*, meaning calls will return immediatly and you'll need to poll on task status."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "# Done once for the whole docs\n",
    "import requests, json\n",
    "api_root = 'http://localhost:5555/api'\n",
    "task_api = '{}/task'.format(api_root)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## async-apply"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "http://localhost:5555/api/task/async-apply/tasks.add\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "{u'state': u'PENDING', u'task-id': u'f4a53407-30f3-42af-869f-b7f8f4fbd684'}"
      ]
     },
     "execution_count": 6,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "args = {'args': [1, 2]}\n",
    "url = '{}/async-apply/tasks.add'.format(task_api)\n",
    "print(url)\n",
    "resp = requests.post(url, data=json.dumps(args))\n",
    "reply = resp.json()\n",
    "reply"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We can see that we created a new task and it's pending. Note that the API is *async*, meaning it won't wait until the task finish."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## apply"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "For create task and wait results you can use 'apply' API."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "http://localhost:5555/api/task/apply/tasks.add\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "{u'result': 3,\n",
       " u'state': u'SUCCESS',\n",
       " u'task-id': u'ced6fd57-419e-4b8e-8d99-0770be717cb4'}"
      ]
     },
     "execution_count": 7,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "args = {'args': [1, 2]}\n",
    "url = '{}/apply/tasks.add'.format(task_api)\n",
    "print(url)\n",
    "resp = requests.post(url, data=json.dumps(args))\n",
    "reply = resp.json()\n",
    "reply"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## result\n",
    "Gets the task result. This is *async* and will return immediatly even if the task didn't finish (with state 'PENDING')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "http://localhost:5555/api/task/result/ced6fd57-419e-4b8e-8d99-0770be717cb4\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "{u'result': 3,\n",
       " u'state': u'SUCCESS',\n",
       " u'task-id': u'ced6fd57-419e-4b8e-8d99-0770be717cb4'}"
      ]
     },
     "execution_count": 5,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "url = '{}/result/{}'.format(task_api, reply['task-id'])\n",
    "print(url)\n",
    "resp = requests.get(url)\n",
    "resp.json()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## revoke\n",
    "Revoke a running task."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "http://localhost:5555/api/task/revoke/bcb4ac2e-cb2d-4a4b-a402-8eb3a3b0c8e8\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "{u'message': u\"Revoked 'bcb4ac2e-cb2d-4a4b-a402-8eb3a3b0c8e8'\"}"
      ]
     },
     "execution_count": 7,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# Run a task\n",
    "args = {'args': [1, 2]}\n",
    "resp = requests.post('{}/async-apply/tasks.sub'.format(task_api), data=json.dumps(args))\n",
    "reply = resp.json()\n",
    "\n",
    "# Now revoke it\n",
    "url = '{}/revoke/{}'.format(task_api, reply['task-id'])\n",
    "print(url)\n",
    "resp = requests.post(url, data='terminate=True')\n",
    "resp.json()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## rate-limit\n",
    "Update [rate limit](http://docs.celeryproject.org/en/latest/userguide/tasks.html#Task.rate_limit) for a task."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "http://localhost:5555/api/task/rate-limit/miki-manjaro\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "{u'message': u'new rate limit set successfully'}"
      ]
     },
     "execution_count": 20,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "worker = 'miki-manjaro'  # You'll need to get the worker name from the worker API (seel below)\n",
    "url = '{}/rate-limit/{}'.format(task_api, worker)\n",
    "print(url)\n",
    "resp = requests.post(url, params={'taskname': 'tasks.add', 'ratelimit': '10'})\n",
    "resp.json()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## timeout\n",
    "Set timeout (both [hard](http://docs.celeryproject.org/en/latest/userguide/tasks.html#Task.time_limit) and [soft](http://docs.celeryproject.org/en/latest/userguide/tasks.html#Task.soft_time_limit)) for a task."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {
    "collapsed": false,
    "scrolled": true
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "http://localhost:5555/api/task/timeout/miki-manjaro\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "{u'message': u'time limits set successfully'}"
      ]
     },
     "execution_count": 22,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "url = '{}/timeout/{}'.format(task_api, worker)\n",
    "print(url)\n",
    "resp = requests.post(url, params={'taskname': 'tasks.add', 'hard': '3.14', 'soft': '3'})  # You can omit soft or hard\n",
    "resp.json()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Worker API"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 55,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "# Once for the documentation\n",
    "worker_api = '{}/worker'.format(api_root)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## workers\n",
    "List workers."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 25,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "http://localhost:5555/api/workers\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "{u'miki-manjaro': {u'completed_tasks': 0,\n",
       "  u'concurrency': 1,\n",
       "  u'queues': [u'celery'],\n",
       "  u'running_tasks': 0,\n",
       "  u'status': True}}"
      ]
     },
     "execution_count": 25,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "url = '{}/workers'.format(api_root)  # Only one not under /worker\n",
    "print(url)\n",
    "resp = requests.get(url)\n",
    "workers = resp.json()\n",
    "workers"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## pool/shutdown\n",
    "Shutdown a worker."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 30,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "http://localhost:5555/api/worker/shutdown/miki-manjaro\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "{u'message': u'Shutting down!'}"
      ]
     },
     "execution_count": 30,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "worker = workers.keys()[0]\n",
    "url = '{}/shutdown/{}'.format(worker_api, worker)\n",
    "print(url)\n",
    "resp = requests.post(url)\n",
    "resp.json()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## pool/restart\n",
    "Restart a worker pool, you need to have [CELERYD_POOL_RESTARTS](http://docs.celeryproject.org/en/latest/configuration.html#std:setting-CELERYD_POOL_RESTARTS) enabled in your configuration)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 43,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "http://localhost:5555/api/worker/pool/restart/miki-manjaro\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "{u'message': u\"Restarting 'miki-manjaro' worker's pool\"}"
      ]
     },
     "execution_count": 43,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "pool_api = '{}/pool'.format(worker_api)\n",
    "url = '{}/restart/{}'.format(pool_api, worker)\n",
    "print(url)\n",
    "resp = requests.post(url)\n",
    "resp.json()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## pool/grow\n",
    "Grows worker pool."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 53,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "http://localhost:5555/api/worker/pool/grow/miki-manjaro\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "{u'message': u\"Growing 'miki-manjaro' worker's pool\"}"
      ]
     },
     "execution_count": 53,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "url = '{}/grow/{}'.format(pool_api, worker)\n",
    "print(url)\n",
    "resp = requests.post(url, params={'n': '10'})\n",
    "resp.json()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## pool/shrink\n",
    "Shrink worker pool."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 54,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "http://localhost:5555/api/worker/pool/shrink/miki-manjaro\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "{u'message': u\"Shrinking 'miki-manjaro' worker's pool\"}"
      ]
     },
     "execution_count": 54,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "url = '{}/shrink/{}'.format(pool_api, worker)\n",
    "print(url)\n",
    "resp = requests.post(url, params={'n': '3'})\n",
    "resp.json()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## pool/autoscale\n",
    "[Autoscale](http://docs.celeryproject.org/en/latest/userguide/workers.html#autoscaling) a pool."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 58,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "http://localhost:5555/api/worker/pool/autoscale/miki-manjaro\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "{u'message': u\"Autoscaling 'miki-manjaro' worker\"}"
      ]
     },
     "execution_count": 58,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "url = '{}/autoscale/{}'.format(pool_api, worker)\n",
    "print(url)\n",
    "resp = requests.post(url, params={'min': '3', 'max': '10'})\n",
    "resp.json()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## queue/add-consumer\n",
    "[Add a consumer](http://docs.celeryproject.org/en/latest/userguide/workers.html#std:control-add_consumer) to a queue."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 62,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "http://localhost:5555/api/worker/queue/add-consumer/miki-manjaro\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "{u'message': u\"add consumer u'jokes'\"}"
      ]
     },
     "execution_count": 62,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "queue_api = '{}/queue'.format(worker_api)\n",
    "url = '{}/add-consumer/{}'.format(queue_api, worker)\n",
    "print(url)\n",
    "resp = requests.post(url, params={'queue': 'jokes'})\n",
    "resp.json()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## queue/cancel-consumer\n",
    "[Cancel a consumer](http://docs.celeryproject.org/en/latest/userguide/workers.html#queues-cancelling-consumers) queue."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 63,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "http://localhost:5555/api/worker/queue/cancel-consumer/miki-manjaro\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "{u'message': u'no longer consuming from jokes'}"
      ]
     },
     "execution_count": 63,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "url = '{}/cancel-consumer/{}'.format(queue_api, worker)\n",
    "print(url)\n",
    "resp = requests.post(url, params={'queue': 'jokes'})\n",
    "resp.json()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Queue API\n",
    "\n",
    "We assume that we've two queues; the default one 'celery' and 'all'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "http://localhost:5555/api/queues/length\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "{u'active_queues': [{u'messages': 2, u'name': u'all'},\n",
       "  {u'messages': 1, u'name': u'celery'}]}"
      ]
     },
     "execution_count": 7,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "url = '{}/queues/length'.format(api_root)\n",
    "print(url)\n",
    "resp = requests.get(url)\n",
    "resp.json()"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 2",
   "language": "python",
   "name": "python2"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 2
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython2",
   "version": "2.7.6"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 0
}
